<?php
namespace app\common\model;

use Elasticsearch\ClientBuilder;
use GuzzleHttp\RequestOptions;
use think\facade\Log;

class EsModel
{
    /**
     * 获取es对象
     * @author 贺强
     * @time   2022/3/10 10:56
     * @return \Elasticsearch\Client es对象
     */
    private static function getEsClient()
    {
        try {
            return ClientBuilder::create()->setHosts([self::getEsConfig()])
                ->setConnectionPool('\Elasticsearch\ConnectionPool\SimpleConnectionPool')
                ->setConnectionParams([
                    'client' => [
                        RequestOptions::TIMEOUT => 30,
                        RequestOptions::CONNECT_TIMEOUT => 30,
                    ],
                ])
                ->setRetries(0)->build();
        } catch (\Exception $e) {
            return null;
        }
    }

    /**
     * 获取es配置
     * @author 贺强
     * @time   2022/3/10 10:56
     * @return array 配置
     */
    private static function getEsConfig()
    {
        return [
            'host' => config('app.elastic.host', '127.0.0.1'),
            'port' => config('app.elastic.port', '9200'),
            'scheme' => config('app.elastic.scheme', 'http'),
        ];
    }

    /**
     * 判断索引是否存在
     * @author 贺强
     * @time   2022/3/18 14:13
     * @param string $index 索引名
     * @return bool
     */
    public static function isExist(string $index)
    {
        $param = ['index' => $index];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            return $client->indices()->exists($param);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 创建 es 索引
     * @author 贺强
     * @time   2022/3/10 10:56
     * @param string $index 索引名
     * @param array $properties 索引配置
     * @param string $refresh_interval 刷新频率
     * @return bool
     */
    public static function createIndex(string $index, array $properties, string $refresh_interval = '10s')
    {
        $param = [
            'index' => $index,
            'body' => [
                'settings' => [
                    'number_of_shards' => 6,                 // 分片个数
                    'number_of_replicas' => 1,               // 副本个数
                    'max_result_window' => 100000000,        // 最大查询数
                    'refresh_interval' => $refresh_interval, // 刷新频率
                ],
                'mappings' => [
                    'properties' => $properties,
                ],
            ],
        ];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->indices()->create($param);
            return !empty($res['acknowledged']);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 删除 es 索引
     * @author 贺强
     * @time   2022/3/18 10:50
     * @param string $index 索引名
     * @return bool
     */
    public static function delIndex(string $index)
    {
        $param = ['index' => $index];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->indices()->delete($param);
            return !empty($res['acknowledged']);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 设置索引的配置
     * @author 贺强
     * @time   2022/3/18 14:24
     * @param string $index 索引名
     * @param array $settings 配置参数
     * @return bool
     */
    public static function setIndexSettings(string $index, array $settings = [])
    {
        $param = [
            'index' => $index,
            'body' => [
                'settings' => $settings,
            ],
        ];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->indices()->putSettings($param);
            return !empty($res);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 获取索引设置
     * @author 贺强
     * @time   2022/3/18 14:28
     * @param string|array $index 索引名，数组表示获取多个索引设置
     * @return array|false
     */
    public static function getIndexSettings($index)
    {
        $param = ['index' => $index];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            return $client->indices()->getSettings($param);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 设置索引映射
     * @author 贺强
     * @time   2022/3/18 14:35
     * @param string $index 索引名
     * @param array $mappings 索引映射
     * @return bool
     */
    public static function setIndexMappings(string $index, array $mappings = [])
    {
        $param = [
            'index' => $index,
            'body' => [
                $index => [
                    '_source' => ['enabled' => true],
                    'properties' => $mappings,
                ],
            ],
        ];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->indices()->putMapping($param);
            return !empty($res);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 获取索引映射
     * @author 贺强
     * @time   2022/3/18 14:40
     * @param string|array $index 索引名，数组表示获取多个索引映射
     * @return array|false
     */
    public static function getIndexMappings($index)
    {
        $param = ['index' => $index];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            return $client->indices()->getMapping($param);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 创建文档
     * @author 贺强
     * @time   2022/3/18 11:17
     * @param string $index 索引
     * @param array $data 文档数据
     * @return bool
     */
    public static function add(string $index, array $data)
    {
        $param = [
            'index' => $index,
            'type' => '_doc',
            'id' => $data['id'] ?? '',
            'body' => $data,
        ];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->index($param);
            return !empty($res['result']) && $res['result'] === 'created';
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 批量保存文档
     * @author 贺强
     * @time   2022/3/18 15:21
     * @param array $data 文档数据
     * @param int $num 分批保存，每次保存的条数
     * @return int
     */
    public static function saveArr(array $data, int $num = 1000)
    {
        try {
            $param = ['body' => []];
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            foreach ($data as $key => $item) {
                $param['body'][] = [
                    'index' => $item['index'],
                ];
                $param['body'][] = $item['data'];
                if (($key + 1) % $num === 0) {
                    $client->bulk($param);
                    $param = ['body' => []];
                }
            }
            if (!empty($param['body'])) {
                $client->bulk($param);
            }
            return count($data);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 同步数据库表数据到 es
     * @author 贺强
     * @time   2022/8/25 11:27
     * @param array $data 同步数据所需参数
     * @return array|callable|int
     */
    public static function syncData(array $data)
    {
        if (empty($data['table']) || empty($data['sql'])) {
            return false;
        }
        $table = $data['table'];// 要同步的数据表名
        $sql = $data['sql'];
        $index = 'es_index_' . $table;
        try {
            if (!self::isExist($index)) {
                $properties = CommonModel::getProperties($table);
                self::createIndex($index, $properties);
            }
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $offset = $data['offset'] ?? 0;
            $length = $data['length'] ?? 100;
            $sql .= " LIMIT $offset,$length";
            $list = CommonModel::query($sql);
            $param = ['body' => []];
            foreach ($list as $item) {
                $param['body'][] = [
                    'index' => [
                        '_index' => $index,
                        '_type' => '_doc',
                        '_id' => $item['id'] ?? '',
                    ],
                ];
                $param['body'][] = $item;
            }
            $res = 0;
            if (!empty($param['body'])) {
                $res = $client->bulk($param);
            }
            return $res;
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 同步数据库表数据到 es
     * @author 贺强
     * @time   2023/8/23 10:25
     * @param array $data 同步数据所需参数，元素：table(表名), ids(要同步的数据ID), attr_field(自定义属性字段名，同时也是不带前缀的自定义属性表名), field(自定义属性表外键字段名)
     * @return bool
     */
    public static function syncAttrData(array $data)
    {
        if (empty($data['table']) || empty($data['ids']) || empty($data['attr_field']) || empty($data['field'])) {
            return false;
        }
        $table = $data['table'];// 要同步的数据表名
        $index = 'es_index_' . $table;
        $ids = explode(',', $data['ids']);
        $prefix = env('database.prefix', '');
        $attr_field = $data['attr_field'];
        $attr_table = $prefix . $attr_field;
        $field = $data['field'];
        try {
            if (!self::isExist($index)) {
                $properties = CommonModel::getProperties($table);
                self::createIndex($index, $properties);
            }
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            foreach ($ids as $id) {
                $sql = "SELECT * FROM `$table` WHERE `id`='$id'";
                $model = CommonModel::query($sql);
                if (empty($model) || empty($model[0])) {
                    return false;
                }
                $model = $model[0];
                $sql_attr = "SELECT * FROM `$attr_table` WHERE `$field`='$id'";
                $attr_model = CommonModel::query($sql_attr);
                if (!empty($attr_model) && !empty($attr_model[0])) {
                    $attr = array_column($attr_model, 'attr_value', 'attr_name');
                } else {
                    $attr = [];
                }
                $model[$attr_field] = json_encode($attr);
                $model['mtime'] = time();
                CommonModel::$tablename = $table;
                $res = CommonModel::modify($model);
                if (!$res) {
                    return false;
                }
            }
            return true;
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 修改 es 字段
     * @author 贺强
     * @time   2022/3/18 11:25
     * @param string $index 索引名
     * @param array $data 要修改的数据
     * @return bool
     */
    public static function modifyField(string $index, array $data)
    {
        $param = [
            'index' => $index,
            'type' => '_doc',
            'id' => $data['id'] ?? '',
            'body' => [
                'doc' => $data,   // 修改
            ],
        ];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->update($param);
            return !empty($res['result']) && $res['result'] === 'updated' || $res['result'] === 'created' || $res['result'] === 'noop';
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 删除 es 文档
     * @author 贺强
     * @time   2022/3/10 10:56
     * @param string $index 索引名
     * @param string $id ID
     * @return bool
     */
    public static function delFile(string $index, string $id)
    {
        $param = [
            'index' => $index,
            'type' => '_doc',
            'id' => $id,
        ];
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return false;
            }
            $res = $client->delete($param);
            return !empty($res['result']) && $res['result'] === 'deleted';
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return false;
        }
    }

    /**
     * 获取单个文档
     * @author 贺强
     * @time   2022/3/22 9:07
     * @param string $index 索引名
     * @param string $id 要获取的数据 ID
     * @param object $field 返回的字段
     * @return array|callable|void
     */
    public static function getFile(string $index, string $id, $field = null)
    {
        // 根据 id 查询单个文档
        $param = [
            'index' => $index,
            'type' => '_doc',
            'id' => $id,
        ];
        if (!empty($field) && $field !== true) {
            if (is_string($field)) {
                $field = explode(',', $field);
            }
            $param['_source'] = $field;
        }
        $debug = $_GET['debug'] ?? ($_COOKIE['debug'] ?? '');
        if ($debug === 'es_w') {
            print_r($param);
            exit;
        } elseif ($debug === "{$index}_w") {
            print_r($param);
            exit;
        } elseif ($debug === 'es_var_w') {
            var_dump($param);
            exit;
        } elseif ($debug === "{$index}_var_w") {
            var_dump($param);
            exit;
        }
        try {
            $client = self::getEsClient();
            if (empty($client)) {
                return [];
            }
            $data = $client->get($param);
            if ($debug === 'es_r') {
                print_r($data);
                exit;
            } elseif ($debug === "{$index}_r") {
                print_r($data);
                exit;
            } elseif ($debug === 'es_var_r') {
                var_dump($data);
                exit;
            } elseif ($debug === "{$index}_var_r") {
                var_dump($data);
                exit;
            }
            return $data;
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return [];
        }
    }

    /**
     * 获取 es 数据
     * @author 贺强
     * @time   2022/3/10 10:56
     * @param string $index es 索引
     * @param mixed $field 要查询的列
     * @param array $where 条件
     * @param mixed $order 排序字段和方式，例：['ctime' => ['order' => 'desc']]
     * @param integer $page 第几页
     * @param integer $pagesize 每页条数
     * @param string $group 分组
     * @param string $distinct distinct 字段
     * @return array
     */
    public static function getData(string $index, $field, array $where, $order = [], int $page = 1, int $pagesize = 20, string $group = '', string $distinct = '')
    {
        if ($page < 1) {
            $page = 1;
        }
        $from = ($page - 1) * $pagesize;
        $param = [
            'from' => $from,
            'size' => $pagesize,
            'index' => $index,
            'type' => '_doc',
        ];
        // 查询条件
        if (!empty($where)) {
            $param['body']['query'] = $where;
        }
        // 返回指定字段
        if (!empty($field) && $field !== true) {
            if (is_string($field)) {
                $field = explode(',', $field);
            }
            $param['body']['_source'] = $field;
        }
        // 结果集排序
        if (!empty($order)) {
            if (is_array($order)) {
                $keys = array_keys($order);
                $order_arr = [];
                foreach ($keys as $key) {
                    if (is_numeric($key)) {
                        $order_arr[$order[$key]] = 'asc';
                    } else {
                        $order_arr[$key] = $order[$key];
                    }
                }
                $param['body']['sort'] = $order_arr;
            } elseif (strpos($order, '[rand]') !== false) {
                $param['body']['sort'] = [
                    '_script' => [
                        'script' => 'Math.random()',
                        'type' => 'number',
                        'order' => 'asc',
                    ],
                ];
            } else {
                $order_arr = [];
                $str_arr = explode(',', $order);
                foreach ($str_arr as $str) {
                    $str = trim($str);
                    if (strpos($str, ' ')) {
                        $arr = explode(' ', $str);
                        $order_arr[$arr[0]] = $arr[1];
                    } else {
                        $order_arr[$str] = 'asc';
                    }
                }
                $param['body']['sort'] = $order_arr;
            }
        }
        // group 分组
        if (!empty($group) && is_string($group)) {
            $param['body']['eggs']['data_group']['terms']['field'] = $group;
        }
        // 去重
        if (!empty($distinct)) {
            $param['body']['eggs']['data_distinct']['cardinality']['field'] = $distinct;
        }
        try {
            // es 对象
            $client = self::getEsClient();
            if (empty($client)) {
                return [];
            }
            $debug = $_GET['debug'] ?? ($_COOKIE['debug'] ?? '');
            if ($debug === 'es_w') {
                print_r($param);
                exit;
            } elseif ($debug === "{$index}_w") {
                print_r($param);
                exit;
            } elseif ($debug === 'es_var_w') {
                var_dump($param);
                exit;
            } elseif ($debug === "{$index}_var_w") {
                var_dump($param);
                exit;
            }
            $data = $client->search($param);
            if ($debug === 'es_r') {
                print_r($data);
                exit;
            } elseif ($debug === "{$index}_r") {
                print_r($data);
                exit;
            } elseif ($debug === 'es_var_r') {
                var_dump($data['hits']['hits']);
                exit;
            } elseif ($debug === "{$index}_var_r") {
                var_dump($data['hits']['hits']);
                exit;
            }
            // 重新封装结果集结构
            $list = $data['hits']['hits'] ?? [];
            $count = $data['hits']['total']['value'] ?? 0;
            return compact('list', 'count', 'where');
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return [];
        }
    }

    /**
     * 查询 es 文档数量
     * @author 贺强
     * @time   2023/9/26 20:59
     * @param string $index es 索引
     * @param array $where 查询条件
     * @param string $group 分组
     * @return array|callable|int
     */
    public static function getCount(string $index, array $where, string $group = '')
    {
        $param = [
            'index' => $index,
            'type' => '_doc',
        ];
        // 查询条件
        if (!empty($where)) {
            $param['body']['query'] = $where;
        }
        // group 分组
        if (!empty($group) && is_string($group)) {
            $param['body']['eggs']['data_group']['terms']['field'] = $group;
        }
        try {
            // es 对象
            $client = self::getEsClient();
            if (empty($client)) {
                return 0;
            }
            $debug = $_GET['debug'] ?? ($_COOKIE['debug'] ?? '');
            if ($debug === 'es_count_w') {
                print_r($param);
                exit;
            } elseif ($debug === "{$index}_count_w") {
                print_r($param);
                exit;
            } elseif ($debug === 'es_count_var_w') {
                var_dump($param);
                exit;
            } elseif ($debug === "{$index}_count_var_w") {
                var_dump($param);
                exit;
            }
            return $client->count($param);
        } catch (\Exception $e) {
            Log::error($e->getMessage());
            return 0;
        }
    }
}
